agentmux_srv\backend\storage\filestore/
offset_ops.rs1use std::collections::HashMap;
8
9use rusqlite::params;
10
11use super::core::{FileStore, PART_DATA_SIZE};
12use crate::backend::storage::error::StoreError;
13
14impl FileStore {
15 #[allow(dead_code)]
18 pub fn write_at(
19 &self,
20 zone_id: &str,
21 name: &str,
22 offset: i64,
23 data: &[u8],
24 ) -> Result<(), StoreError> {
25 if data.is_empty() {
26 return Ok(());
27 }
28
29 let key = (zone_id.to_string(), name.to_string());
30 let now = Self::now_ms();
31
32 let file = self.stat(zone_id, name)?.ok_or(StoreError::NotFound)?;
33 if offset > file.size {
34 return Err(StoreError::Other(format!(
35 "offset {} exceeds file size {}",
36 offset, file.size
37 )));
38 }
39
40 let new_size = std::cmp::max(file.size, offset + data.len() as i64);
41 let pds = PART_DATA_SIZE as i64;
42
43 let (actual_offset, actual_data) = if file.opts.circular && file.opts.maxsize > 0 {
45 let start_cir_offset = new_size - file.opts.maxsize;
46 if start_cir_offset > 0 {
47 let end = offset + data.len() as i64;
48 if end <= start_cir_offset {
49 return Ok(());
51 }
52 if offset < start_cir_offset {
53 let skip = (start_cir_offset - offset) as usize;
54 (start_cir_offset, &data[skip..])
55 } else {
56 (offset, data)
57 }
58 } else {
59 (offset, data)
60 }
61 } else {
62 (offset, data)
63 };
64
65 let start_part = (actual_offset / pds) as i32;
67 let end_part = ((actual_offset + actual_data.len() as i64 - 1) / pds) as i32;
68
69 let conn = self.conn.lock().unwrap();
70 let mut data_pos = 0usize;
71
72 for part_idx in start_part..=end_part {
73 let part_start = part_idx as i64 * pds;
74 let offset_in_part = if part_idx == start_part {
75 (actual_offset - part_start) as usize
76 } else {
77 0
78 };
79
80 let existing: Option<Vec<u8>> = conn
82 .query_row(
83 "SELECT data FROM db_file_data WHERE zoneid = ?1 AND name = ?2 AND partidx = ?3",
84 params![zone_id, name, part_idx],
85 |row| row.get(0),
86 )
87 .ok();
88
89 let mut part_data = existing.unwrap_or_default();
90 if part_data.len() < offset_in_part {
92 part_data.resize(offset_in_part, 0);
93 }
94
95 let remaining = actual_data.len() - data_pos;
97 let space = PART_DATA_SIZE - offset_in_part;
98 let to_copy = remaining.min(space);
99
100 if offset_in_part < part_data.len() {
101 let overwrite_end = (offset_in_part + to_copy).min(part_data.len());
103 let overwrite_len = overwrite_end - offset_in_part;
104 part_data[offset_in_part..offset_in_part + overwrite_len]
105 .copy_from_slice(&actual_data[data_pos..data_pos + overwrite_len]);
106 if to_copy > overwrite_len {
107 part_data.extend_from_slice(
108 &actual_data[data_pos + overwrite_len..data_pos + to_copy],
109 );
110 }
111 } else {
112 part_data.extend_from_slice(&actual_data[data_pos..data_pos + to_copy]);
113 }
114
115 conn.execute(
116 "REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
117 params![zone_id, name, part_idx, part_data],
118 )?;
119 data_pos += to_copy;
120 }
121
122 conn.execute(
124 "UPDATE db_wave_file SET size = ?1, modts = ?2 WHERE zoneid = ?3 AND name = ?4",
125 params![new_size, now, zone_id, name],
126 )?;
127 drop(conn);
128
129 let mut cache = self.cache.lock().unwrap();
131 if let Some(entry) = cache.get_mut(&key) {
132 if let Some(ref mut f) = entry.file {
133 f.size = new_size;
134 f.modts = now;
135 }
136 for part_idx in start_part..=end_part {
138 entry.data_entries.remove(&part_idx);
139 }
140 }
141
142 Ok(())
143 }
144
145 pub fn read_at(
149 &self,
150 zone_id: &str,
151 name: &str,
152 offset: i64,
153 size: i64,
154 ) -> Result<(i64, Vec<u8>), StoreError> {
155 let file = self
156 .stat(zone_id, name)?
157 .ok_or(StoreError::NotFound)?;
158
159 if file.size == 0 {
160 return Ok((0, Vec::new()));
161 }
162
163 let data_len = file.data_length();
164 let data_start = file.data_start_idx();
165
166 let mut actual_offset = offset;
168 let mut actual_size = if size == 0 { data_len } else { size };
169
170 if file.opts.circular && file.opts.maxsize > 0 {
171 if actual_offset < data_start {
172 let skip = data_start - actual_offset;
173 actual_offset = data_start;
174 actual_size -= skip;
175 }
176 if actual_size <= 0 {
177 return Ok((data_start, Vec::new()));
178 }
179 }
180
181 if actual_offset >= file.size {
183 return Ok((actual_offset, Vec::new()));
184 }
185 let available = file.size - actual_offset;
186 actual_size = actual_size.min(available);
187
188 if actual_size <= 0 {
189 return Ok((actual_offset, Vec::new()));
190 }
191
192 let pds = PART_DATA_SIZE as i64;
193 let start_part = (actual_offset / pds) as i32;
194 let end_part = ((actual_offset + actual_size - 1) / pds) as i32;
195
196 let conn = self.conn.lock().unwrap();
198 let mut parts_map: HashMap<i32, Vec<u8>> = HashMap::new();
199 for part_idx in start_part..=end_part {
200 if let Ok(data) = conn.query_row(
201 "SELECT data FROM db_file_data WHERE zoneid = ?1 AND name = ?2 AND partidx = ?3",
202 params![zone_id, name, part_idx],
203 |row| row.get::<_, Vec<u8>>(0),
204 ) {
205 parts_map.insert(part_idx, data);
206 }
207 }
208 drop(conn);
209
210 let mut result = Vec::with_capacity(actual_size as usize);
212 for part_idx in start_part..=end_part {
213 if let Some(part_data) = parts_map.get(&part_idx) {
214 let part_start = part_idx as i64 * pds;
215 let skip = if part_start < actual_offset {
216 (actual_offset - part_start) as usize
217 } else {
218 0
219 };
220 let remaining = actual_size as usize - result.len();
221 let take = remaining.min(part_data.len().saturating_sub(skip));
222 if take > 0 {
223 result.extend_from_slice(&part_data[skip..skip + take]);
224 }
225 }
226 }
227
228 Ok((actual_offset, result))
229 }
230}